package com.innovation.ic.sc.web.listener;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.innovation.ic.b1b.framework.manager.ThreadPoolManager;
import com.innovation.ic.sc.base.handler.sc.ModelHandler;
import com.innovation.ic.sc.base.pojo.constant.ScTableName;
import com.innovation.ic.sc.base.pojo.constant.config.DatabaseGlobal;
import com.innovation.ic.sc.base.service.ServiceHelper;
import com.innovation.ic.sc.base.service.sc.InventoryService;
import com.innovation.ic.sc.base.thread.listener.kafka.AdvantageBrandListenHandleThread;
import com.innovation.ic.sc.base.thread.listener.kafka.AdvantageModeListenHandleThread;
import com.innovation.ic.sc.base.thread.listener.kafka.InventoryListenHandleThread;
import com.innovation.ic.sc.base.thread.listener.kafka.UserListenHandleThread;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;

/**
 * 当出现数据库操作后，调用这个监听器的方法
 */
@Component
@EnableScheduling
public class KafkaListener {
    @Resource
    private ServiceHelper serviceHelper;

    @Resource
    private ModelHandler modelHandler;

    @Resource
    private ThreadPoolManager threadPoolManager;

    @Resource
    private InventoryService inventoryService;

    /**
     * 当出现数据库操作后，调用这个方法
     * @param consumer 数据库变动内容
     */
    @org.springframework.kafka.annotation.KafkaListener(topics = "sc")
    public void listen(ConsumerRecord<?, ?> consumer) {
        String json = (String) consumer.value();
        JSONObject metaDataJsonObject = JSONObject.parseObject(json);
        // 数据库名称
        String database = metaDataJsonObject.getString("database");
        // 表名称
        String table = metaDataJsonObject.getString("table");
        // 操作类型
        String type = metaDataJsonObject.getString("type");
        // 操作数据
        JSONArray dataJsonArray = metaDataJsonObject.getJSONArray("data");

        if (null != dataJsonArray && dataJsonArray.size() > 0) {
            JSONObject dataJsonObject = dataJsonArray.getJSONObject(0);
            // sc、sc-test、sc-prod数据库
            if (DatabaseGlobal.SC.equals(database) || DatabaseGlobal.SC_TEST.equals(database) || DatabaseGlobal.SC_PROD.equals(database)) {
                // user表
                if (table.equals(ScTableName.USER)) {
                    UserListenHandleThread userListenHandleThread = new UserListenHandleThread(type, consumer, dataJsonObject, modelHandler, serviceHelper);
                    threadPoolManager.execute(userListenHandleThread);
                }

                // 优势品牌表
//                if (table.equals(ScTableName.ADVANTAGE_BRAND)) {
//                    AdvantageBrandListenHandleThread advantageBrandListenHandleThread = new AdvantageBrandListenHandleThread(type, consumer, dataJsonObject, modelHandler, inventoryService);
//                    threadPoolManager.execute(advantageBrandListenHandleThread);
//                }

                // 优势型号表
                if (table.equals(ScTableName.ADVANTAGE_MODEL)) {
                    AdvantageModeListenHandleThread advantageModeListenHandleThread = new AdvantageModeListenHandleThread(type, consumer, dataJsonObject, modelHandler, serviceHelper, inventoryService);
                    threadPoolManager.execute(advantageModeListenHandleThread);
                }

                // 库存表
                if (table.equals(ScTableName.INVENTORY)) {
                    InventoryListenHandleThread inventoryListenHandleThread = new InventoryListenHandleThread(type, consumer, dataJsonObject, modelHandler, serviceHelper, inventoryService);
                    threadPoolManager.execute(inventoryListenHandleThread);
                }

                // 人员管理表
                // 这个先在不用了
//                if (table.equals(ScTableName.PERSONNEL_MANAGEMENT)) {
//                    // 人员管理编辑更新
//                    PersonnelManagementListenHandleThread kafkaPersonnelManagementListenHandleThread = new PersonnelManagementListenHandleThread(type, consumer, dataJsonObject, serviceHelper, personnelManagementMapper);
//                    threadPoolManager.execute(kafkaPersonnelManagementListenHandleThread);
//                }

                //品牌 备注:此表暂时不用
                /*if (table.equals(ScTableName.BRAND)) {
                    BrandListenHandleThread brandListenHandleThread = new BrandListenHandleThread(type, consumer, dataJsonObject, modelHandler, serviceHelper);
                    threadPoolManager.execute(brandListenHandleThread);
                }*/
            }
        }
    }
}